{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "8e1ec46b",
   "metadata": {},
   "source": [
    "# Batch producing"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "dcfccd0f",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "\n",
    "import platform\n",
    "\n",
    "from IPython.display import Markdown as md\n",
    "\n",
    "from fastkafka._components._subprocess import terminate_asyncio_process\n",
    "from fastkafka._testing.apache_kafka_broker import run_and_match\n",
    "from fastkafka.testing import ApacheKafkaBroker, run_script_and_cancel"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "61526c5c",
   "metadata": {},
   "source": [
    "If you want to send your data in batches `@produces` decorator makes that possible for you. By returning a `list` of messages you want to send in a batch the producer will collect the messages and send them in a batch to a Kafka broker.\n",
    "\n",
    "This guide will demonstrate how to use this feature."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3237efbe",
   "metadata": {},
   "source": [
    "## Return a batch from the producing function"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d0af82f7",
   "metadata": {},
   "source": [
    "To define a batch that you want to produce to Kafka topic, you need to return the `List` of the messages that you want to be batched from your producing function."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "109cf37d",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "from typing import List\n",
       "\n",
       "@app.produces()\n",
       "async def to_hello_world(msgs: List[str]) -> List[HelloWorld]:\n",
       "    return [HelloWorld(msg=msg) for msg in msgs]\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "hello_world_batch = \"\"\"\n",
    "from typing import List\n",
    "\n",
    "@app.produces()\n",
    "async def to_hello_world(msgs: List[str]) -> List[HelloWorld]:\n",
    "    return [HelloWorld(msg=msg) for msg in msgs]\n",
    "\"\"\"\n",
    "\n",
    "md(f\"```python\\n{hello_world_batch}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "034df146",
   "metadata": {},
   "source": [
    "In the example, we want to return the `HelloWorld` message class batch that is created from a list of msgs we passed into our producing function."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "86b104f9",
   "metadata": {},
   "source": [
    "Lets also prepare a backgound task that will send a batch of \"hello world\" messages when the app starts."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "11fba86b",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "@app.run_in_background()\n",
       "async def prepare_and_send_hello_batch():\n",
       "    msgs=[f\"Hello world {i}\" for i in range(10)]\n",
       "    await to_hello_world(msgs)\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "bg_run = \"\"\"\n",
    "@app.run_in_background()\n",
    "async def prepare_and_send_hello_batch():\n",
    "    msgs=[f\"Hello world {i}\" for i in range(10)]\n",
    "    await to_hello_world(msgs)\n",
    "\"\"\"\n",
    "\n",
    "md(f\"```python\\n{bg_run}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "84d30a25",
   "metadata": {},
   "source": [
    "## App example"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d3e2c6ad",
   "metadata": {},
   "source": [
    "We will modify the app example from [@producer basics](/docs/guides/Guide_21_Produces_Basics.md) guide to return the `HelloWorld` batch. The final app will look like this (make sure you replace the `<url_of_your_kafka_bootstrap_server>` and `<port_of_your_kafka_bootstrap_server>` with the actual values):"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "673c7f8a",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "\n",
    "app = \"\"\"\n",
    "import asyncio\n",
    "from fastkafka import FastKafka\n",
    "from pydantic import BaseModel, Field\n",
    "\n",
    "class HelloWorld(BaseModel):\n",
    "    msg: str = Field(\n",
    "        ...,\n",
    "        example=\"Hello\",\n",
    "        description=\"Demo hello world message\",\n",
    "    )\n",
    "\n",
    "kafka_brokers = {\n",
    "    \"demo_broker\": {\n",
    "        \"url\": \"<url_of_your_kafka_bootstrap_server>\",\n",
    "        \"description\": \"local demo kafka broker\",\n",
    "        \"port\": \"<port_of_your_kafka_bootstrap_server>\",\n",
    "    }\n",
    "}\n",
    "\n",
    "app = FastKafka(kafka_brokers=kafka_brokers)\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2abb4c3d",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "import asyncio\n",
       "from fastkafka import FastKafka\n",
       "from pydantic import BaseModel, Field\n",
       "\n",
       "class HelloWorld(BaseModel):\n",
       "    msg: str = Field(\n",
       "        ...,\n",
       "        example=\"Hello\",\n",
       "        description=\"Demo hello world message\",\n",
       "    )\n",
       "\n",
       "kafka_brokers = {\n",
       "    \"demo_broker\": {\n",
       "        \"url\": \"<url_of_your_kafka_bootstrap_server>\",\n",
       "        \"description\": \"local demo kafka broker\",\n",
       "        \"port\": \"<port_of_your_kafka_bootstrap_server>\",\n",
       "    }\n",
       "}\n",
       "\n",
       "app = FastKafka(kafka_brokers=kafka_brokers)\n",
       "\n",
       "@app.run_in_background()\n",
       "async def prepare_and_send_hello_batch():\n",
       "    msgs=[f\"Hello world {i}\" for i in range(10)]\n",
       "    await to_hello_world(msgs)\n",
       "\n",
       "from typing import List\n",
       "\n",
       "@app.produces()\n",
       "async def to_hello_world(msgs: List[str]) -> List[HelloWorld]:\n",
       "    return [HelloWorld(msg=msg) for msg in msgs]\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "batch_example = app + bg_run + hello_world_batch\n",
    "\n",
    "md(f\"```python\\n{batch_example}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "afd85e8b",
   "metadata": {},
   "source": [
    "## Run the app"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ce98e25b",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "Now we can run the app. Copy the code above in producer_example.py and run it by running\n",
       "```shell\n",
       "fastkafka run --num-workers=1 --kafka-broker=demo_broker producer_with_key_example:app\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "script_file = \"producer_with_key_example.py\"\n",
    "cmd = \"fastkafka run --num-workers=1 --kafka-broker=demo_broker producer_with_key_example:app\"\n",
    "md(\n",
    "    f\"Now we can run the app. Copy the code above in producer_example.py and run it by running\\n```shell\\n{cmd}\\n```\"\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cedba893",
   "metadata": {},
   "source": [
    "After running the command, you should see this output in your terminal:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0049ee5c",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): entering...\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): (<_UnixSelectorEventLoop running=True closed=False debug=False>) is already running!\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): calling nest_asyncio.apply()\n",
      "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n",
      "[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...\n",
      "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n",
      "[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: <class 'fastkafka.testing.ApacheKafkaBroker'>.start(): returning 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): exited.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): entering...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 45714...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 45714 terminated.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 45353...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 45353 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): exited.\n"
     ]
    }
   ],
   "source": [
    "# | hide\n",
    "\n",
    "with ApacheKafkaBroker(\n",
    "    topics=[\"hello_world\"], apply_nest_asyncio=True, listener_port=23092\n",
    ") as bootstrap_server:\n",
    "    server_url = bootstrap_server.split(\":\")[0]\n",
    "    server_port = bootstrap_server.split(\":\")[1]\n",
    "    exit_code, output = await run_script_and_cancel(\n",
    "        script=batch_example.replace(\n",
    "            \"<url_of_your_kafka_bootstrap_server>\", server_url\n",
    "        ).replace(\"<port_of_your_kafka_bootstrap_server>\", server_port),\n",
    "        script_file=script_file,\n",
    "        cmd=cmd,\n",
    "        cancel_after=5,\n",
    "    )\n",
    "\n",
    "    expected_returncode = [0, 1]\n",
    "    assert exit_code in expected_returncode, f'{exit_code=}, {output.decode(\"UTF-8\")}'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "55632ae6",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[46480]: [INFO] fastkafka._application.app: run_in_background() : Adding function 'prepare_and_send_hello_batch' as background task\n",
      "[46480]: [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to '127.0.0.1:9092'\n",
      "[46480]: [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'\n",
      "[46480]: [INFO] fastkafka._application.app: _populate_bg_tasks() : Starting background task 'prepare_and_send_hello_batch'\n",
      "Starting process cleanup, this may take a few seconds...\n",
      "[INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 46480...\n",
      "[46480]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Cancelling background task 'prepare_and_send_hello_batch'\n",
      "[46480]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Waiting for background task 'prepare_and_send_hello_batch' to finish\n",
      "[46480]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Execution finished for background task 'prepare_and_send_hello_batch'\n",
      "[INFO] fastkafka._server: terminate_asyncio_process(): Process 46480 terminated.\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "print(output.decode(\"UTF-8\"))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d4ec6dab",
   "metadata": {},
   "source": [
    "## Check if the batch was sent to the Kafka topic with the defined key"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c81e65bc",
   "metadata": {},
   "source": [
    "Lets check the topic and see if there are \"Hello world\" messages in the hello_world topic. In your terminal run:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6ef181f6",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```shell\n",
       "kafka-console-consumer.sh --topic=hello_world --from-beginning --bootstrap-server=<address_of_your_kafka_bootstrap_server>\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "script_extension = \".bat\" if platform.system() == \"Windows\" else \".sh\"\n",
    "consumer_cmd = f\"kafka-console-consumer{script_extension} --topic=hello_world --from-beginning --bootstrap-server=<address_of_your_kafka_bootstrap_server>\"\n",
    "md(f\"```shell\\n{consumer_cmd}\\n```\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "476e0e5d",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): entering...\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): (<_UnixSelectorEventLoop running=True closed=False debug=False>) is already running!\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): calling nest_asyncio.apply()\n",
      "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n",
      "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: <class 'fastkafka.testing.ApacheKafkaBroker'>.start(): returning 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): exited.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 47627...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 47627 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): entering...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 46861...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 46861 terminated.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 46500...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 46500 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): exited.\n"
     ]
    }
   ],
   "source": [
    "# | hide\n",
    "\n",
    "expected_msg = '{\"msg\":\"Hello world *[0-9]\"}'\n",
    "\n",
    "with ApacheKafkaBroker(\n",
    "    topics=[\"hello_world\"], apply_nest_asyncio=True, listener_port=23092\n",
    ") as bootstrap_server:\n",
    "    server_url = bootstrap_server.split(\":\")[0]\n",
    "    server_port = bootstrap_server.split(\":\")[1]\n",
    "    exit_code, output = await run_script_and_cancel(\n",
    "        script=batch_example.replace(\n",
    "            \"<url_of_your_kafka_bootstrap_server>\", server_url\n",
    "        ).replace(\"<port_of_your_kafka_bootstrap_server>\", server_port),\n",
    "        script_file=script_file,\n",
    "        cmd=cmd,\n",
    "        cancel_after=5,\n",
    "    )\n",
    "\n",
    "    expected_returncode = [0, 1]\n",
    "    assert exit_code in expected_returncode, f'{exit_code=}, {output.decode(\"UTF-8\")}'\n",
    "\n",
    "    proc = await run_and_match(\n",
    "        *consumer_cmd.replace(\n",
    "            \"<address_of_your_kafka_bootstrap_server>\", bootstrap_server\n",
    "        ).split(\" \"),\n",
    "        pattern=expected_msg,\n",
    "        timeout=30,\n",
    "        num_to_match=10\n",
    "    )\n",
    "\n",
    "    await terminate_asyncio_process(proc)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "51f90b8b",
   "metadata": {},
   "source": [
    "You should see the batch of messages in your topic."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6ecc09af",
   "metadata": {},
   "source": [
    "## Batch key"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1266f878",
   "metadata": {},
   "source": [
    "To define a key for your batch like in [Defining a partition key](/docs/guides/Guide_22_Partition_Keys.md) guide you can wrap the returning value in a `KafkaEvent` class. To learn more about defining a partition ke and `KafkaEvent` class, please, have a look at [Defining a partition key](/docs/guides/Guide_22_Partition_Keys.md) guide.\n",
    "\n",
    "Let's demonstrate that."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e6859c55",
   "metadata": {},
   "source": [
    "To define a key, we just need to modify our producing function, like this:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e0336bb1",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "from typing import List\n",
       "from fastkafka import KafkaEvent\n",
       "\n",
       "@app.produces()\n",
       "async def to_hello_world(msgs: List[str]) -> KafkaEvent[List[HelloWorld]]:\n",
       "    return KafkaEvent([HelloWorld(msg=msg) for msg in msgs], key=b\"my_key\")\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "hello_world_batch_key = \"\"\"\n",
    "from typing import List\n",
    "from fastkafka import KafkaEvent\n",
    "\n",
    "@app.produces()\n",
    "async def to_hello_world(msgs: List[str]) -> KafkaEvent[List[HelloWorld]]:\n",
    "    return KafkaEvent([HelloWorld(msg=msg) for msg in msgs], key=b\"my_key\")\n",
    "\"\"\"\n",
    "\n",
    "md(f\"```python\\n{hello_world_batch_key}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2f7e890c",
   "metadata": {},
   "source": [
    "Now our app looks like this:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "463db2f9",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "import asyncio\n",
       "from fastkafka import FastKafka\n",
       "from pydantic import BaseModel, Field\n",
       "\n",
       "class HelloWorld(BaseModel):\n",
       "    msg: str = Field(\n",
       "        ...,\n",
       "        example=\"Hello\",\n",
       "        description=\"Demo hello world message\",\n",
       "    )\n",
       "\n",
       "kafka_brokers = {\n",
       "    \"demo_broker\": {\n",
       "        \"url\": \"<url_of_your_kafka_bootstrap_server>\",\n",
       "        \"description\": \"local demo kafka broker\",\n",
       "        \"port\": \"<port_of_your_kafka_bootstrap_server>\",\n",
       "    }\n",
       "}\n",
       "\n",
       "app = FastKafka(kafka_brokers=kafka_brokers)\n",
       "\n",
       "@app.run_in_background()\n",
       "async def prepare_and_send_hello_batch():\n",
       "    msgs=[f\"Hello world {i}\" for i in range(10)]\n",
       "    await to_hello_world(msgs)\n",
       "\n",
       "from typing import List\n",
       "from fastkafka import KafkaEvent\n",
       "\n",
       "@app.produces()\n",
       "async def to_hello_world(msgs: List[str]) -> KafkaEvent[List[HelloWorld]]:\n",
       "    return KafkaEvent([HelloWorld(msg=msg) for msg in msgs], key=b\"my_key\")\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "batch_key_example = app + bg_run + hello_world_batch_key\n",
    "\n",
    "md(f\"```python\\n{batch_key_example}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6f0832bc",
   "metadata": {},
   "source": [
    "## Check if the batch was sent to the Kafka topic"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6ce14634",
   "metadata": {},
   "source": [
    "Lets check the topic and see if there are \"Hello world\" messages in the hello_world topic, containing a defined key. In your terminal run:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3934d700",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```shell\n",
       "kafka-console-consumer.sh --topic=hello_world --property print.key=true --from-beginning --bootstrap-server=<address_of_your_kafka_bootstrap_server>\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "script_extension = \".bat\" if platform.system() == \"Windows\" else \".sh\"\n",
    "consumer_cmd = f\"kafka-console-consumer{script_extension} --topic=hello_world --property print.key=true --from-beginning --bootstrap-server=<address_of_your_kafka_bootstrap_server>\"\n",
    "md(f\"```shell\\n{consumer_cmd}\\n```\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "31ee42ec",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): entering...\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): (<_UnixSelectorEventLoop running=True closed=False debug=False>) is already running!\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): calling nest_asyncio.apply()\n",
      "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n",
      "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: <class 'fastkafka.testing.ApacheKafkaBroker'>.start(): returning 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): exited.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 49116...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 49116 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): entering...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 48349...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 48349 terminated.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 47988...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 47988 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): exited.\n"
     ]
    }
   ],
   "source": [
    "# | hide\n",
    "\n",
    "expected_msg = 'my_key\t{\"msg\":\"Hello world *[0-9]\"}'\n",
    "\n",
    "with ApacheKafkaBroker(\n",
    "    topics=[\"hello_world\"], apply_nest_asyncio=True, listener_port=23092\n",
    ") as bootstrap_server:\n",
    "    server_url = bootstrap_server.split(\":\")[0]\n",
    "    server_port = bootstrap_server.split(\":\")[1]\n",
    "    exit_code, output = await run_script_and_cancel(\n",
    "        script=batch_key_example.replace(\n",
    "            \"<url_of_your_kafka_bootstrap_server>\", server_url\n",
    "        ).replace(\"<port_of_your_kafka_bootstrap_server>\", server_port),\n",
    "        script_file=script_file,\n",
    "        cmd=cmd,\n",
    "        cancel_after=5,\n",
    "    )\n",
    "\n",
    "    expected_returncode = [0, 1]\n",
    "    assert exit_code in expected_returncode, f'{exit_code=}, {output.decode(\"UTF-8\")}'\n",
    "\n",
    "    proc = await run_and_match(\n",
    "        *consumer_cmd.replace(\n",
    "            \"<address_of_your_kafka_bootstrap_server>\", bootstrap_server\n",
    "        ).split(\" \"),\n",
    "        pattern=expected_msg,\n",
    "        timeout=30,\n",
    "        num_to_match=1\n",
    "    )\n",
    "\n",
    "    await terminate_asyncio_process(proc)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4ae5ba3c",
   "metadata": {},
   "source": [
    "You should see the batch of messages with the defined key in your topic."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
